Amazon DynamoDB zero-ETL integration with Amazon OpenSearch Service をCDKで書いてみた
はじめに
以下のブログで紹介したDynamoDBからOpenSearchへのデータ連携機能について、CDKでの書き方を紹介します。
コード全量
import { Construct } from "constructs"; import * as cdk from "aws-cdk-lib"; import * as dynamodb from "aws-cdk-lib/aws-dynamodb"; import * as iam from "aws-cdk-lib/aws-iam"; import * as s3 from "aws-cdk-lib/aws-s3"; import * as opensearch from "aws-cdk-lib/aws-opensearchservice"; import * as osis from "aws-cdk-lib/aws-osis"; type Props = cdk.StackProps & { table: dynamodb.ITable; }; export class SimpleStack extends cdk.Stack { constructor(scope: Construct, id: string, props: Props) { super(scope, id, props); const { table } = props; const domain = new opensearch.Domain(this, "Domain", { version: opensearch.EngineVersion.OPENSEARCH_2_11, capacity: { multiAzWithStandbyEnabled: false, }, removalPolicy: cdk.RemovalPolicy.DESTROY, }); /** * 既存のDynamoDB ItemsをOpenSearchに同期するためのS3バケット */ const bucket = new s3.Bucket(this, "Bucket", { blockPublicAccess: s3.BlockPublicAccess.BLOCK_ALL, encryption: s3.BucketEncryption.S3_MANAGED, autoDeleteObjects: true, removalPolicy: cdk.RemovalPolicy.DESTROY, }); /** * OSIS Pipeline用のIAM Role */ const pipelineRole = new iam.Role(this, "IngestionRole", { assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com", { conditions: { StringEquals: { "aws:SourceAccount": this.account, }, ArnLike: { "aws:SourceArn": this.formatArn({ service: "osis", resource: "pipeline", resourceName: "*", }), }, }, }), inlinePolicies: { /** * @see https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-domain-access.html#pipeline-access-configure */ ingestionPipeline: new iam.PolicyDocument({ statements: [ new iam.PolicyStatement({ actions: ["es:DescribeDomain"], resources: [domain.domainArn], }), new iam.PolicyStatement({ actions: ["es:ESHttp*"], resources: [`${domain.domainArn}/*`], }), ], }), /** * @see https://docs.aws.amazon.com/opensearch-service/latest/developerguide/configure-client-ddb.html#ddb-pipeline-role */ dynamodbIngestion: new iam.PolicyDocument({ statements: [ new iam.PolicyStatement({ sid: "allowRunExportJob", actions: [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime", ], resources: [table.tableArn + ""], }), new iam.PolicyStatement({ sid: "allowCheckExportjob", actions: ["dynamodb:DescribeExport"], resources: [table.tableArn + "/export/*"], }), new iam.PolicyStatement({ sid: "allowReadFromStream", actions: [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", ], resources: [table.tableArn + "/stream/*"], }), new iam.PolicyStatement({ sid: "allowReadAndWriteToS3ForExport", actions: [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl", ], resources: [bucket.bucketArn + "/*"], }), ], }), }, }); /** * OSIS PipelineのためのOpenSearchドメインのリソースポリシー * @see https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-domain-access.html#pipeline-access-domain */ domain.addAccessPolicies( new iam.PolicyStatement({ principals: [pipelineRole], actions: ["es:DescribeDomain", "es:ESHttp*"], resources: [`${domain.domainArn}/*`], }), ); /** * OSIS Pipeline */ new osis.CfnPipeline(this, "OSISPipeline", { pipelineName: "simple-osis-pipeline", minUnits: 1, maxUnits: 4, pipelineConfigurationBody: ` version: "2" dynamodb-pipeline: source: dynamodb: acknowledgments: true tables: - table_arn: ${table.tableArn} stream: start_position: LATEST export: s3_bucket: ${bucket.bucketName} s3_region: ${this.region} aws: sts_role_arn: ${pipelineRole.roleArn} region: ${this.region} sink: - opensearch: hosts: - https://${domain.domainEndpoint} index: table-index index_type: custom document_id: \${getMetadata("primary_key")} action: \${getMetadata("opensearch_action")} document_version: \${getMetadata("document_version")} document_version_type: external aws: sts_role_arn: ${pipelineRole.roleArn} region: ${this.region} `, }); } }
解説
IAM Role
DynamoDBからOpenSearchへのデータ連携を行うための権限を持ったIAM Roleを作成します。
const pipelineRole = new iam.Role(this, "IngestionRole", { /** * 以下のドキュメントのとおり、`osis-pipelines.amazonaws.com`を信頼するService Principalを指定します。 * @see https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-domain-access.html#pipeline-access-configure */ assumedBy: new iam.ServicePrincipal("osis-pipelines.amazonaws.com", { conditions: { StringEquals: { "aws:SourceAccount": this.account, }, ArnLike: { "aws:SourceArn": this.formatArn({ service: "osis", resource: "pipeline", resourceName: "*", }), }, }, }), inlinePolicies: { /** * OpenSearchにデータを投入するための権限 * @see https://docs.aws.amazon.com/opensearch-service/latest/developerguide/pipeline-domain-access.html#pipeline-access-configure */ ingestionPipeline: new iam.PolicyDocument({ statements: [ new iam.PolicyStatement({ actions: ["es:DescribeDomain"], resources: [domain.domainArn], }), new iam.PolicyStatement({ actions: ["es:ESHttp*"], resources: [`${domain.domainArn}/*`], }), ], }), /** * DynamoDBからデータを取得するための権限 * @see https://docs.aws.amazon.com/opensearch-service/latest/developerguide/configure-client-ddb.html#ddb-pipeline-role */ dynamodbIngestion: new iam.PolicyDocument({ statements: [ new iam.PolicyStatement({ sid: "allowRunExportJob", actions: [ "dynamodb:DescribeTable", "dynamodb:DescribeContinuousBackups", "dynamodb:ExportTableToPointInTime", ], resources: [table.tableArn + ""], }), new iam.PolicyStatement({ sid: "allowCheckExportjob", actions: ["dynamodb:DescribeExport"], resources: [table.tableArn + "/export/*"], }), new iam.PolicyStatement({ sid: "allowReadFromStream", actions: [ "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", ], resources: [table.tableArn + "/stream/*"], }), new iam.PolicyStatement({ sid: "allowReadAndWriteToS3ForExport", actions: [ "s3:GetObject", "s3:AbortMultipartUpload", "s3:PutObject", "s3:PutObjectAcl", ], resources: [bucket.bucketArn + "/*"], }), ], }), }, });
OpenSearch Domainのリソースポリシー
ドキュメントの内容に従って、以下のようにリソースポリシーを設定します。
domain.addAccessPolicies( new iam.PolicyStatement({ principals: [pipelineRole], actions: ["es:DescribeDomain", "es:ESHttp*"], resources: [`${domain.domainArn}/*`], }), );
OSIS Pipeline
以下がOpenSearchのIngestion Pipelineを設定するためのCDKのコードです。
まだL2が作成されていないので、L1のosis.CfnPipeline
クラスで作成しています
それでもDynamoDB TableのARNや、S3 Bucketの名前や、OpenSearchドメインのエンドポイントなどを簡単に参照できるので、CDKで書くメリットはあると思います。
new osis.CfnPipeline(this, "OSISPipeline", { pipelineName: "simple-osis-pipeline", minUnits: 1, maxUnits: 4, pipelineConfigurationBody: ` version: "2" dynamodb-pipeline: source: dynamodb: acknowledgments: true tables: - table_arn: ${table.tableArn} stream: start_position: LATEST export: s3_bucket: ${bucket.bucketName} s3_region: ${this.region} aws: sts_role_arn: ${pipelineRole.roleArn} region: ${this.region} sink: - opensearch: hosts: - https://${domain.domainEndpoint} index: table-index index_type: custom document_id: \${getMetadata("primary_key")} action: \${getMetadata("opensearch_action")} document_version: \${getMetadata("document_version")} document_version_type: external aws: sts_role_arn: ${pipelineRole.roleArn} region: ${this.region} `, });
まとめ
個人的にCDKが手に馴染んでいるのもありますが、CDKで書くことで何のリソースが必要でそれぞれどのように依存しているのかが俯瞰できるのが魅力だと思います。
このデータ連携に使われているOpenSearchの機能であるData Prepperにはまだまだ多くの機能があるようなので、引き続き検証していきたいと思っています。
以上でした!